/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.PendingSegmentRecord;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.MatcherAssert;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
  private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
  private static final String DATASOURCE = "testDS";
  private static final String STREAM = "stream";
  private static final String SHARD_ID = "0";
  private static final StreamPartition<String> SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID);
  private static final String EXCEPTION_MSG = "I had an exception";
  private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", "v1", "k2", 20);

  private TaskStorage taskStorage;
  private TaskMaster taskMaster;
  private TaskRunner taskRunner;
  private TaskQueue taskQueue;
  private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
  private SeekableStreamIndexTaskClientFactory taskClientFactory;
  private SeekableStreamSupervisorSpec spec;
  private SeekableStreamIndexTaskClient indexTaskClient;
  private RecordSupplier<String, String, ByteEntity> recordSupplier;

  private RowIngestionMetersFactory rowIngestionMetersFactory;
  private SupervisorStateManagerConfig supervisorConfig;

  private StubServiceEmitter emitter;

  @Before
  public void setupTest()
  {
    taskStorage = createMock(TaskStorage.class);
    taskMaster = createMock(TaskMaster.class);
    taskRunner = createMock(TaskRunner.class);
    taskQueue = createMock(TaskQueue.class);
    indexerMetadataStorageCoordinator = createMock(IndexerMetadataStorageCoordinator.class);
    taskClientFactory = createMock(SeekableStreamIndexTaskClientFactory.class);
    spec = createMock(SeekableStreamSupervisorSpec.class);
    indexTaskClient = createMock(SeekableStreamIndexTaskClient.class);
    recordSupplier = createMock(RecordSupplier.class);

    rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();

    supervisorConfig = new SupervisorStateManagerConfig();

    emitter = new StubServiceEmitter("test-supervisor-state", "localhost");

    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();

    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
    EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();

    EasyMock.expect(taskClientFactory.build(
        EasyMock.anyString(),
        EasyMock.anyObject(),
        EasyMock.anyInt(),
        EasyMock.anyObject()
    )).andReturn(
        indexTaskClient).anyTimes();
    EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
    EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();

    taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
    EasyMock.expectLastCall().times(0, 1);

    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
    EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
    EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("10").anyTimes();
  }

  @Test
  public void testRunning()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testRunningStreamGetSequenceNumberReturnsNull()
  {
    EasyMock.reset(recordSupplier);
    EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
    EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn(null).anyTimes();
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
    Assert.assertEquals(1, exceptionEvents.size());
    Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
    Assert.assertEquals(ISE.class.getName(), exceptionEvents.get(0).getExceptionClass());
    Assert.assertEquals(StringUtils.format("unable to fetch sequence number for partition[%s] from stream", SHARD_ID), exceptionEvents.get(0).getMessage());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testConnectingToStreamFail()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
            .andThrow(new StreamException(new IllegalStateException(EXCEPTION_MSG)))
            .anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
    Assert.assertEquals(1, exceptionEvents.size());
    Assert.assertTrue(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
    Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvents.get(0).getExceptionClass());
    Assert.assertEquals(
        StringUtils.format("%s: %s", IllegalStateException.class.getName(), EXCEPTION_MSG),
        exceptionEvents.get(0).getMessage()
    );
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testConnectingToStreamFailRecoveryFailRecovery()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
            .andThrow(new StreamException(new IllegalStateException()))
            .times(3);
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
            .andThrow(new StreamException(new IllegalStateException()))
            .times(3);
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).times(3);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CONNECTING_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    supervisor.runInternal();
    Assert.assertEquals(SeekableStreamState.UNABLE_TO_CONNECT_TO_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    supervisor.runInternal();
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    supervisor.runInternal();
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    supervisor.runInternal();
    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.LOST_CONTACT_WITH_STREAM, supervisor.stateManager.getSupervisorState());
    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testDiscoveringInitialTasksFailRecoveryFail()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).times(6);
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
    supervisor.start();

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
    Assert.assertEquals(1, exceptionEvents.size());
    Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
    Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvents.get(0).getExceptionClass());
    Assert.assertEquals(EXCEPTION_MSG, exceptionEvents.get(0).getMessage());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.DISCOVERING_INITIAL_TASKS, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testIdleStateTransition() throws Exception
  {
    EasyMock.reset(spec);
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
    EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
        "stream",
        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
        1,
        1,
        new Period("PT1H"),
        new Period("PT1S"),
        new Period("PT30S"),
        false,
        new Period("PT30M"),
        null,
        null,
        null,
        null,
        new IdleConfig(true, 200L),
        null
    )
    {
    }).anyTimes();
    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
      @Override
      public Duration getEmissionDuration()
      {
        return new Period("PT1S").toStandardDuration();
      }
    }).anyTimes();
    EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    Thread.sleep(100L);
    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    Thread.sleep(100L);
    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    Thread.sleep(100L);
    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.IDLE, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testCreatingTasksFailRecoveryFail()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).times(3);
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andThrow(new IllegalStateException(EXCEPTION_MSG)).times(3);
    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(ImmutableList.of()).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
    supervisor.start();

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    List<SupervisorStateManager.ExceptionEvent> exceptionEvents = supervisor.stateManager.getExceptionEvents();
    Assert.assertEquals(1, exceptionEvents.size());
    Assert.assertFalse(((SeekableStreamExceptionEvent) exceptionEvents.get(0)).isStreamException());
    Assert.assertEquals(IllegalStateException.class.getName(), exceptionEvents.get(0).getExceptionClass());
    Assert.assertEquals(EXCEPTION_MSG, exceptionEvents.get(0).getMessage());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(SeekableStreamState.CREATING_TASKS, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(2, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertEquals(3, supervisor.stateManager.getExceptionEvents().size());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();
    Assert.assertFalse(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.UNHEALTHY_SUPERVISOR, supervisor.stateManager.getSupervisorState());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testSuspended()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    verifyAll();
  }

  @Test
  public void testStopping()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    taskRunner.unregisterListener("testSupervisorId");
    indexTaskClient.close();
    recordSupplier.close();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.stop(false);

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState().getBasicState());

    verifyAll();
  }

  @Test
  public void testStoppingGracefully()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    taskRunner.unregisterListener("testSupervisorId");
    indexTaskClient.close();
    recordSupplier.close();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.RUNNING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertTrue(supervisor.stateManager.isAtLeastOneSuccessfulRun());

    supervisor.stop(true);

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState().getBasicState());

    // Subsequent run after graceful shutdown has begun
    supervisor.runInternal();
    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.STOPPING, supervisor.stateManager.getSupervisorState().getBasicState());

    verifyAll();
  }

  @Test(timeout = 60_000L)
  public void testCheckpointForActiveTaskGroup() throws InterruptedException, JsonProcessingException
  {
    DateTime startTime = DateTimes.nowUtc();
    SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
            STREAM,
            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
            1,
            1,
            new Period("PT1H"),
            new Period("PT1S"),
            new Period("PT30S"),
            false,
            new Period("PT30M"),
            null,
            null,
            null,
            null,
            new IdleConfig(true, 200L),
            null
    ) {};

    EasyMock.reset(spec);
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
      @Override
      public Duration getEmissionDuration()
      {
        return new Period("PT2S").toStandardDuration();
      }
    }).anyTimes();
    EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
    EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    SeekableStreamIndexTaskIOConfig taskIoConfig = createTaskIoConfigExt(
            0,
            Collections.singletonMap("0", "10"),
            Collections.singletonMap("0", "20"),
            "test",
            startTime,
            null,
            Collections.emptySet(),
            ioConfig
    );

    SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

    TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
    sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

    Map<String, Object> context = new HashMap<>();
    context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

    TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
            "id1",
            null,
            getDataSchema(),
            taskTuningConfig,
            taskIoConfig,
            context,
            "0"
    );

    TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
            "id2",
            null,
            getDataSchema(),
            taskTuningConfig,
            taskIoConfig,
            context,
            "0"
    );

    TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
        "id3",
        null,
        getDataSchema(),
        taskTuningConfig,
        taskIoConfig,
        context,
        "0"
    );

    final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
    final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
    final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
    workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));

    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
            .andReturn(ImmutableList.of(id1, id2, id3))
            .anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();

    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
            .andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.PAUSED))
            .anyTimes();

    EasyMock.expect(indexTaskClient.getStartTimeAsync("id1")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
    EasyMock.expect(indexTaskClient.getStartTimeAsync("id2")).andReturn(Futures.immediateFuture(startTime)).anyTimes();
    EasyMock.expect(indexTaskClient.getStartTimeAsync("id3")).andReturn(Futures.immediateFuture(startTime)).anyTimes();

    ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, partitionOffset);

    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .anyTimes();
    EasyMock.expect(indexTaskClient.pauseAsync("id1"))
            .andReturn(Futures.immediateFuture(partitionOffset))
            .anyTimes();
    EasyMock.expect(indexTaskClient.pauseAsync("id2"))
            .andReturn(Futures.immediateFuture(partitionOffset))
            .anyTimes();
    EasyMock.expect(indexTaskClient.pauseAsync("id3"))
            .andReturn(Futures.immediateFuture(partitionOffset))
            .anyTimes();
    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset, false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.resumeAsync("id1"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.resumeAsync("id2"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.resumeAsync("id3"))
            .andReturn(Futures.immediateFuture(false))
            .anyTimes();
    EasyMock.expect(indexTaskClient.stopAsync("id1", false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.stopAsync("id2", false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();

    taskQueue.shutdown(
        "id3",
        "Killing forcefully as task could not be resumed in the"
        + " first supervisor run after Overlord change."
    );
    EasyMock.expectLastCall().atLeastOnce();

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.runInternal();

    supervisor.checkpoint(
            0,
            new TestSeekableStreamDataSourceMetadata(
                    new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
            )
    );

    while (supervisor.getNoticesQueueSize() > 0) {
      Thread.sleep(100);
    }

    verifyAll();

    Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
  }

  @Test(timeout = 60_000L)
  public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws InterruptedException, JsonProcessingException
  {
    // Assuming tasks have surpassed their duration limit at test execution
    DateTime startTime = DateTimes.nowUtc().minusHours(2);
    // Configure supervisor to stop only one task at a time
    int stopTaskCount = 1;
    SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
        STREAM,
        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
        1,
        3,
        new Period("PT1H"),
        new Period("PT1S"),
        new Period("PT30S"),
        false,
        new Period("PT30M"),
        null,
        null,
        null,
        null,
        new IdleConfig(true, 200L),
        stopTaskCount
    )
    {
    };

    EasyMock.reset(spec);
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
    {
      @Override
      public Duration getEmissionDuration()
      {
        return new Period("PT2S").toStandardDuration();
      }
    }).anyTimes();
    EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
    EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

    TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
    sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

    Map<String, Object> context = new HashMap<>();
    context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

    TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
        "id1",
        null,
        getDataSchema(),
        taskTuningConfig,
        createTaskIoConfigExt(
            0,
            Collections.singletonMap("0", "10"),
            Collections.singletonMap("0", "20"),
            "test",
            startTime,
            null,
            Collections.emptySet(),
            ioConfig
        ),
        context,
        "0"
    );

    TestSeekableStreamIndexTask id2 = new TestSeekableStreamIndexTask(
        "id2",
        null,
        getDataSchema(),
        taskTuningConfig,
        createTaskIoConfigExt(
            1,
            Collections.singletonMap("1", "10"),
            Collections.singletonMap("1", "20"),
            "test",
            startTime,
            null,
            Collections.emptySet(),
            ioConfig
        ),
        context,
        "1"
    );

    TestSeekableStreamIndexTask id3 = new TestSeekableStreamIndexTask(
        "id3",
        null,
        getDataSchema(),
        taskTuningConfig,
        createTaskIoConfigExt(
            2,
            Collections.singletonMap("2", "10"),
            Collections.singletonMap("2", "20"),
            "test",
            startTime,
            null,
            Collections.emptySet(),
            ioConfig
        ),
        context,
        "2"
    );

    final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);
    final TaskLocation location2 = TaskLocation.create("testHost2", 145, -1);
    final TaskLocation location3 = TaskLocation.create("testHost3", 145, -1);

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));
    workItems.add(new TestTaskRunnerWorkItem(id2, null, location2));
    workItems.add(new TestTaskRunnerWorkItem(id3, null, location3));

    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
    EasyMock.expect(taskRunner.getTaskLocation(id2.getId())).andReturn(location2).anyTimes();
    EasyMock.expect(taskRunner.getTaskLocation(id3.getId())).andReturn(location3).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
            .andReturn(ImmutableList.of(id1, id2, id3))
            .anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id2")).andReturn(Optional.of(TaskStatus.running("id2"))).anyTimes();
    EasyMock.expect(taskStorage.getStatus("id3")).andReturn(Optional.of(TaskStatus.running("id3"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id2")).andReturn(Optional.of(id2)).anyTimes();
    EasyMock.expect(taskStorage.getTask("id3")).andReturn(Optional.of(id2)).anyTimes();

    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
            .andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id2"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id3"))
            .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
            .anyTimes();

    EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
            .andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
            .anyTimes();
    // Mocking to return the earliest start time for task id2, indicating it's the first group to start
    EasyMock.expect(indexTaskClient.getStartTimeAsync("id2"))
            .andReturn(Futures.immediateFuture(startTime)).anyTimes();
    EasyMock.expect(indexTaskClient.getStartTimeAsync("id3"))
            .andReturn(Futures.immediateFuture(startTime.plusSeconds(2)))
            .anyTimes();

    ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, partitionOffset);

    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id2"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .anyTimes();
    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id3"), EasyMock.anyBoolean()))
            .andReturn(Futures.immediateFuture(checkpoints))
            .anyTimes();
    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id1", partitionOffset, false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id2", partitionOffset, false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.setEndOffsetsAsync("id3", partitionOffset, false))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.resumeAsync("id1"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.resumeAsync("id2"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.resumeAsync("id3"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.pauseAsync("id1"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.pauseAsync("id2"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();
    EasyMock.expect(indexTaskClient.pauseAsync("id3"))
            .andReturn(Futures.immediateFuture(true))
            .anyTimes();

    // Expect the earliest-started task (id2) to transition to publishing first
    taskQueue.shutdown("id2", "All tasks in group[%s] failed to transition to publishing state", 1);

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.runInternal();

    supervisor.checkpoint(
        0,
        new TestSeekableStreamDataSourceMetadata(
            new SeekableStreamStartSequenceNumbers<>(STREAM, checkpoints.get(0), ImmutableSet.of())
        )
    );

    while (supervisor.getNoticesQueueSize() > 0) {
      Thread.sleep(100);
    }

    verifyAll();

    Assert.assertTrue(supervisor.getNoticesQueueSize() == 0);
  }

  @Test(timeout = 10_000L)
  public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, InterruptedException
  {
    DateTime startTime = DateTimes.nowUtc();
    SeekableStreamSupervisorIOConfig ioConfig = new SeekableStreamSupervisorIOConfig(
        STREAM,
        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
        1,
        1,
        new Period("PT1H"),
        new Period("PT1S"),
        new Period("PT30S"),
        false,
        new Period("PT30M"),
        null,
        null,
        null,
        null,
        new IdleConfig(true, 200L),
        null
    )
    {
    };

    EasyMock.reset(spec);
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
    EasyMock.expect(spec.getIoConfig()).andReturn(ioConfig).anyTimes();
    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()
    {
      @Override
      public Duration getEmissionDuration()
      {
        return new Period("PT2S").toStandardDuration();
      }
    }).anyTimes();
    EasyMock.expect(spec.getType()).andReturn("stream").anyTimes();
    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
    EasyMock.expect(spec.getContextValue("tags")).andReturn("").anyTimes();
    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    SeekableStreamIndexTaskTuningConfig taskTuningConfig = getTuningConfig().convertToTaskTuningConfig();

    TreeMap<Integer, Map<String, Long>> sequenceOffsets = new TreeMap<>();
    sequenceOffsets.put(0, ImmutableMap.of("0", 10L, "1", 20L));

    Map<String, Object> context = new HashMap<>();
    context.put("checkpoints", new ObjectMapper().writeValueAsString(sequenceOffsets));

    TestSeekableStreamIndexTask id1 = new TestSeekableStreamIndexTask(
        "id1",
        null,
        getDataSchema(),
        taskTuningConfig,
        createTaskIoConfigExt(
            0,
            Collections.singletonMap("0", "10"),
            Collections.singletonMap("0", "20"),
            "test",
            startTime,
            null,
            Collections.emptySet(),
            ioConfig
        ),
        context,
        "0"
    );

    final TaskLocation location1 = TaskLocation.create("testHost", 1234, -1);

    Collection workItems = new ArrayList<>();
    workItems.add(new TestTaskRunnerWorkItem(id1, null, location1));

    EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
    EasyMock.expect(taskRunner.getTaskLocation(id1.getId())).andReturn(location1).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE))
        .andReturn(ImmutableList.of(id1))
        .anyTimes();
    EasyMock.expect(taskStorage.getStatus("id1")).andReturn(Optional.of(TaskStatus.running("id1"))).anyTimes();
    EasyMock.expect(taskStorage.getTask("id1")).andReturn(Optional.of(id1)).anyTimes();

    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
        .andReturn(new TestSeekableStreamDataSourceMetadata(null)).anyTimes();
    EasyMock.expect(indexTaskClient.getStatusAsync("id1"))
        .andReturn(Futures.immediateFuture(SeekableStreamIndexTaskRunner.Status.READING))
        .anyTimes();

    EasyMock.expect(indexTaskClient.getStartTimeAsync("id1"))
        .andReturn(Futures.immediateFuture(startTime.plusSeconds(1)))
        .anyTimes();

    ImmutableMap<String, String> partitionOffset = ImmutableMap.of("0", "10");
    final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
    checkpoints.put(0, partitionOffset);

    EasyMock.expect(indexTaskClient.getCheckpointsAsync(EasyMock.contains("id1"), EasyMock.anyBoolean()))
        .andReturn(Futures.immediateFuture(checkpoints))
        .anyTimes();

    // The task should only be pause/resumed in one of the runInternal commands, after stopTaskGroupEarly has been called.
    EasyMock.expect(indexTaskClient.resumeAsync("id1"))
        .andReturn(Futures.immediateFuture(true))
        .once();
    EasyMock.expect(indexTaskClient.pauseAsync("id1"))
        .andReturn(Futures.immediateFuture(true))
        .once();
    taskQueue.shutdown("id1", "All tasks in group[%s] failed to transition to publishing state", 0);

    replayAll();

    SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.runInternal();
    supervisor.handoffTaskGroupsEarly(ImmutableList.of(0));

    while (supervisor.getNoticesQueueSize() > 0) {
      Thread.sleep(100);
    }
    supervisor.runInternal();
    verifyAll();
  }

  @Test
  public void testEmitBothLag() throws Exception
  {
    expectEmitterSupervisor(false);

    CountDownLatch latch = new CountDownLatch(1);
    TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.LAG,
        ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
        ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
    );


    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());


    latch.await();

    final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS, METRIC_TAGS);
    emitter.verifyValue("ingest/test/lag", dimFilters, 850L);
    emitter.verifyValue("ingest/test/maxLag", dimFilters, 500L);
    emitter.verifyValue("ingest/test/avgLag", dimFilters, 283L);
    emitter.verifyValue("ingest/test/lag/time", dimFilters, 45000L);
    emitter.verifyValue("ingest/test/maxLag/time", dimFilters, 20000L);
    emitter.verifyValue("ingest/test/avgLag/time", dimFilters, 15000L);
    verifyAll();
  }

  @Test
  public void testEmitRecordLag() throws Exception
  {
    expectEmitterSupervisor(false);

    CountDownLatch latch = new CountDownLatch(1);
    TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.LAG,
        ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
        null
    );

    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());


    latch.await();

    final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS, METRIC_TAGS);
    emitter.verifyValue("ingest/test/lag", dimFilters, 850L);
    emitter.verifyValue("ingest/test/maxLag", dimFilters, 500L);
    emitter.verifyValue("ingest/test/avgLag", dimFilters, 283L);
    verifyAll();
  }

  @Test
  public void testEmitTimeLag() throws Exception
  {
    expectEmitterSupervisor(false);

    CountDownLatch latch = new CountDownLatch(1);
    TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.LAG,
        null,
        ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
    );


    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());


    latch.await();

    final Map<String, Object> dimFilters = ImmutableMap.of(DruidMetrics.TAGS, METRIC_TAGS);
    emitter.verifyValue("ingest/test/lag/time", dimFilters, 45000L);
    emitter.verifyValue("ingest/test/maxLag/time", dimFilters, 20000L);
    emitter.verifyValue("ingest/test/avgLag/time", dimFilters, 15000L);

    verifyAll();
  }

  @Test
  public void testEmitNoticesQueueSize() throws Exception
  {
    expectEmitterSupervisor(false);

    CountDownLatch latch = new CountDownLatch(1);
    TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.NOTICE_QUEUE,
        null,
        null
    );


    supervisor.start();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());


    latch.await();

    final Map<String, Object> dimFilters = ImmutableMap.of(
        DruidMetrics.TAGS, METRIC_TAGS,
        DruidMetrics.DATASOURCE, "testDS"
    );
    emitter.verifyValue("ingest/notices/queueSize", dimFilters, 0);

    verifyAll();
  }

  @Test
  public void testEmitNoticesTime() throws Exception
  {
    expectEmitterSupervisor(false);
    CountDownLatch latch = new CountDownLatch(1);
    TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.NOTICE_PROCESS,
        null,
        null
    );
    supervisor.start();
    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
    Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());
    latch.await();

    final Map<String, Object> dimFilters = ImmutableMap.of(
        DruidMetrics.TAGS, METRIC_TAGS,
        DruidMetrics.DATASOURCE, "testDS",
        "noticeType", "run_notice"
    );
    long observedNoticeTime = emitter.getValue("ingest/notices/time", dimFilters).longValue();
    Assert.assertTrue(observedNoticeTime > 0);

    verifyAll();
  }

  @Test
  public void testEmitNoLagWhenSuspended() throws Exception
  {
    expectEmitterSupervisor(true);

    CountDownLatch latch = new CountDownLatch(1);
    TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.LAG,
        ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
        ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
    );


    supervisor.start();
    supervisor.runInternal();

    Assert.assertTrue(supervisor.stateManager.isHealthy());
    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState());
    Assert.assertEquals(BasicState.SUSPENDED, supervisor.stateManager.getSupervisorState().getBasicState());
    Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());


    latch.await();

    emitter.verifyNotEmitted("ingest/test/lag");
    emitter.verifyNotEmitted("ingest/test/maxLag");
    emitter.verifyNotEmitted("ingest/test/avgLag");
    emitter.verifyNotEmitted("ingest/test/lag/time");
    emitter.verifyNotEmitted("ingest/test/maxLag/time");
    emitter.verifyNotEmitted("ingest/test/avgLag/time");

    verifyAll();
  }

  @Test
  public void testGetStats()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.expect(indexTaskClient.getMovingAveragesAsync("task1"))
            .andReturn(Futures.immediateFuture(ImmutableMap.of("prop1", "val1")))
            .times(1);
    EasyMock.expect(indexTaskClient.getMovingAveragesAsync("task2"))
            .andReturn(Futures.immediateFuture(ImmutableMap.of("prop2", "val2")))
            .times(1);
    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );
    Map<String, Map<String, Object>> stats = supervisor.getStats();

    verifyAll();

    Assert.assertEquals(1, stats.size());
    Assert.assertEquals(ImmutableSet.of("0"), stats.keySet());
    Assert.assertEquals(
        ImmutableMap.of(
            "task1", ImmutableMap.of("prop1", "val1"),
            "task2", ImmutableMap.of("prop2", "val2")
        ),
        stats.get("0")
    );
  }

  @Test
  public void testSupervisorResetAllWithCheckpoints() throws InterruptedException
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(
        true
    );
    taskQueue.shutdown("task1", "DataSourceMetadata is not found while reset");
    EasyMock.expectLastCall();
    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "5"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "6"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());

    supervisor.reset(null);
    validateSupervisorStateAfterResetOffsets(supervisor, ImmutableMap.of(), 0);
  }

  @Test
  public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws InterruptedException, IOException
  {
    final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "0", "1", "10", "2", "20", "3", "30");
    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", "1000", "2", "2500");
    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "1000", "1", "10", "2", "2500", "3", "30");

    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new TestSeekableStreamDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<>(
                STREAM,
                checkpointOffsets
            )
        )
    );
    EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            expectedOffsets
        ))
    )).andReturn(
        true
    );

    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
    EasyMock.expectLastCall();

    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        checkpointOffsets,
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            resetOffsets
        )
    );

    Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());

    supervisor.resetOffsets(resetMetadata);

    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
  }

  @Test
  public void testRegisterNewVersionOfPendingSegment()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false);

    Capture<PendingSegmentRecord> captured0 = Capture.newInstance(CaptureType.FIRST);
    Capture<PendingSegmentRecord> captured1 = Capture.newInstance(CaptureType.FIRST);
    EasyMock.expect(
        indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task0"), EasyMock.capture(captured0))
    ).andReturn(Futures.immediateFuture(true));
    EasyMock.expect(
        indexTaskClient.registerNewVersionOfPendingSegmentAsync(EasyMock.eq("task2"), EasyMock.capture(captured1))
    ).andReturn(Futures.immediateFuture(true));

    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    // Spin off two active tasks with each task serving one partition.
    supervisor.getIoConfig().setTaskCount(3);
    supervisor.start();

    final SeekableStreamSupervisor.TaskGroup taskGroup0 = supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "5"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task0"),
        ImmutableSet.of()
    );
    final SeekableStreamSupervisor.TaskGroup taskGroup1 = supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "6"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );
    final SeekableStreamSupervisor.TaskGroup taskGroup2 = supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition("2"),
        ImmutableMap.of("2", "100"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    final PendingSegmentRecord pendingSegmentRecord0 = new PendingSegmentRecord(
        new SegmentIdWithShardSpec(
            "DS",
            Intervals.of("2024/2025"),
            "2024",
            new NumberedShardSpec(1, 0)
        ),
        taskGroup0.getBaseSequenceName(),
        "prevId0",
        "someAppendedSegment0",
        taskGroup0.getBaseSequenceName()
    );
    final PendingSegmentRecord pendingSegmentRecord1 = new PendingSegmentRecord(
        new SegmentIdWithShardSpec(
            "DS",
            Intervals.of("2024/2025"),
            "2024",
            new NumberedShardSpec(2, 0)
        ),
        taskGroup2.getBaseSequenceName(),
        "prevId1",
        "someAppendedSegment1",
        taskGroup2.getBaseSequenceName()
    );

    supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord0);
    supervisor.registerNewVersionOfPendingSegment(pendingSegmentRecord1);

    Assert.assertEquals(pendingSegmentRecord0, captured0.getValue());
    Assert.assertEquals(pendingSegmentRecord1, captured1.getValue());
    verifyAll();
  }

  @Test
  public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException
  {
    final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "5", "1", "6", "2", "100");
    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", "10", "1", "8");
    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "10", "1", "8", "2", "100");

    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new TestSeekableStreamDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<>(
                STREAM,
                checkpointOffsets
            )
        )
    );
    EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            expectedOffsets
        )
    ))).andReturn(true);
    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
    EasyMock.expectLastCall();

    taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called");
    EasyMock.expectLastCall();

    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    // Spin off two active tasks with each task serving one partition.
    supervisor.getIoConfig().setTaskCount(3);
    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "5"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "6"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("2"),
        ImmutableMap.of("2", "100"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task3"),
        ImmutableSet.of()
    );

    final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            resetOffsets
        )
    );

    Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());

    supervisor.resetOffsets(resetMetadata);

    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
  }

  @Test
  public void testSupervisorResetOffsetsWithNoCheckpoints() throws InterruptedException
  {
    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("0", "10", "1", "8");
    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.copyOf(resetOffsets);

    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null);
    EasyMock.expect(indexerMetadataStorageCoordinator.insertDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            expectedOffsets
        )
    ))).andReturn(true);
    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
    EasyMock.expectLastCall();

    taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called");
    EasyMock.expectLastCall();

    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    // Spin off three active tasks with each task serving one partition.
    supervisor.getIoConfig().setTaskCount(3);
    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "5"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "6"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("2"),
        ImmutableMap.of("2", "100"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task3"),
        ImmutableSet.of()
    );

    final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            resetOffsets
        )
    );

    Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());

    supervisor.resetOffsets(resetMetadata);

    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
  }


  @Test
  public void testSupervisorResetWithNoPartitions() throws IOException, InterruptedException
  {
    final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "5", "1", "6");
    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of();
    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "5", "1", "6");

    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new TestSeekableStreamDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<>(
                STREAM,
                checkpointOffsets
            )
        )
    );
    EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            expectedOffsets
        )
    ))).andReturn(true);

    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    // Spin off two active tasks with each task serving one partition.
    supervisor.getIoConfig().setTaskCount(2);
    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "5"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "6"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            resetOffsets
        )
    );

    Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());

    supervisor.resetOffsets(resetMetadata);

    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 2);
  }

  @Test
  public void testSupervisorResetWithNewPartition() throws IOException, InterruptedException
  {
    final ImmutableMap<String, String> checkpointOffsets = ImmutableMap.of("0", "5", "1", "6");
    final ImmutableMap<String, String> resetOffsets = ImmutableMap.of("2", "20");
    final ImmutableMap<String, String> expectedOffsets = ImmutableMap.of("0", "5", "1", "6", "2", "20");

    EasyMock.expect(spec.isSuspended()).andReturn(false);
    EasyMock.reset(indexerMetadataStorageCoordinator);
    EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
        new TestSeekableStreamDataSourceMetadata(
            new SeekableStreamEndSequenceNumbers<>(
                STREAM,
                checkpointOffsets
            )
        )
    );
    EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            "stream",
            expectedOffsets
        )
    ))).andReturn(true);
    taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called");
    EasyMock.expectLastCall();

    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    // Spin off two active tasks with each task serving one partition.
    supervisor.getIoConfig().setTaskCount(2);
    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "5"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "6"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            STREAM,
            resetOffsets
        )
    );

    Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(0, supervisor.getNoticesQueueSize());
    Assert.assertEquals(0, supervisor.getPartitionOffsets().size());

    supervisor.resetOffsets(resetMetadata);

    validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1);
  }

  @Test
  public void testSupervisorNoResetDataSourceMetadata()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false);
    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    verifyAll();

    MatcherAssert.assertThat(
        Assert.assertThrows(DruidException.class, () ->
            supervisor.resetOffsets(null)
        ),
        DruidExceptionMatcher.invalidInput().expectMessageIs(
            "Reset dataSourceMetadata is required for resetOffsets."
        )
    );
  }

  @Test
  public void testSupervisorResetWithInvalidStartSequenceMetadata()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false);
    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    verifyAll();

    final DataSourceMetadata dataSourceMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamStartSequenceNumbers<>(
            "i-am-not-real",
            ImmutableMap.of("0", "10", "1", "20", "2", "30"),
            ImmutableSet.of()
        )
    );

    MatcherAssert.assertThat(
        Assert.assertThrows(DruidException.class, () ->
            supervisor.resetOffsets(dataSourceMetadata)
        ),
        DruidExceptionMatcher.invalidInput().expectMessageIs(
            StringUtils.format(
                "Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[SeekableStreamEndSequenceNumbers], but found[SeekableStreamStartSequenceNumbers].",
                dataSourceMetadata
            )
        )
    );
  }

  @Test
  public void testSupervisorResetInvalidStream()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false);
    replayAll();

    final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

    supervisor.start();
    supervisor.addTaskGroupToActivelyReadingTaskGroup(
        supervisor.getTaskGroupIdForPartition("0"),
        ImmutableMap.of("0", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task1"),
        ImmutableSet.of()
    );

    supervisor.addTaskGroupToPendingCompletionTaskGroup(
        supervisor.getTaskGroupIdForPartition("1"),
        ImmutableMap.of("1", "0"),
        Optional.absent(),
        Optional.absent(),
        ImmutableSet.of("task2"),
        ImmutableSet.of()
    );

    verifyAll();

    final DataSourceMetadata dataSourceMetadata = new TestSeekableStreamDataSourceMetadata(
        new SeekableStreamEndSequenceNumbers<>(
            "i-am-not-real",
            ImmutableMap.of("0", "10", "1", "20", "2", "30")
        )
    );

    MatcherAssert.assertThat(
        Assert.assertThrows(DruidException.class, () ->
            supervisor.resetOffsets(dataSourceMetadata)
        ),
        DruidExceptionMatcher.invalidInput().expectMessageIs(
            "Stream[i-am-not-real] doesn't exist in the supervisor[testSupervisorId]. Supervisor is consuming stream[stream]."
        )
    );
  }

  @Test
  public void testStaleOffsetsNegativeLagNotEmitted() throws Exception
  {
    expectEmitterSupervisor(false);

    CountDownLatch latch = new CountDownLatch(1);

    final TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
        latch,
        TestEmittingTestSeekableStreamSupervisor.LAG,
        // Record lag must not be emitted
        ImmutableMap.of("0", 10L, "1", -100L),
        null
    );
    supervisor.start();
    // Forcibly set the offsets to be stale
    supervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(Integer.MAX_VALUE);

    latch.await();

    supervisor.emitLag();
    Assert.assertEquals(0, emitter.getEvents().size());
  }

  private void validateSupervisorStateAfterResetOffsets(
      final TestSeekableStreamSupervisor supervisor,
      final ImmutableMap<String, String> expectedResetOffsets,
      final int expectedActiveTaskCount
  ) throws InterruptedException
  {
    // Wait for the notice queue to be drained asynchronously before we validate the supervisor's final state.
    while (supervisor.getNoticesQueueSize() > 0) {
      Thread.sleep(100);
    }
    Thread.sleep(1000);
    Assert.assertEquals(expectedActiveTaskCount, supervisor.getActiveTaskGroupsCount());
    Assert.assertEquals(expectedResetOffsets.size(), supervisor.getPartitionOffsets().size());
    for (Map.Entry<String, String> entry : expectedResetOffsets.entrySet()) {
      Assert.assertEquals(supervisor.getNotSetMarker(), supervisor.getPartitionOffsets().get(entry.getKey()));
    }
    verifyAll();
  }

  @Test
  public void testScheduleReporting()
  {
    EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
    DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig();
    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2);
    ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class);
    EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once();
    EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2);

    EasyMock.replay(executorService, spec);
    final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor()
    {
      @Override
      public LagStats computeLagStats()
      {
        return new LagStats(0, 0, 0);
      }
    };
    supervisor.scheduleReporting(executorService);
    EasyMock.verify(executorService, spec);
  }

  private void expectEmitterSupervisor(boolean suspended)
  {
    spec = createMock(SeekableStreamSupervisorSpec.class);
    EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
    EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
    EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig(
        "stream",
        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
        1,
        1,
        new Period("PT1H"),
        new Period("PT1S"),
        new Period("PT30S"),
        false,
        new Period("PT30M"),
        null,
        null,
        null,
        null,
        null,
        null
    )
    {
    }).anyTimes();
    EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
    EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
    EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() {
      @Override
      public Duration getEmissionDuration()
      {
        return new Period("PT1S").toStandardDuration();
      }
    }).anyTimes();
    EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes();
    EasyMock.expect(spec.getType()).andReturn("test").anyTimes();
    EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();

    EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
    EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
    EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();

    replayAll();
  }

  private static DataSchema getDataSchema()
  {
    List<DimensionSchema> dimensions = new ArrayList<>();
    dimensions.add(StringDimensionSchema.create("dim1"));
    dimensions.add(StringDimensionSchema.create("dim2"));

    return new DataSchema(
        DATASOURCE,
        new TimestampSpec("timestamp", "iso", null),
        new DimensionsSpec(dimensions),
        new AggregatorFactory[]{new CountAggregatorFactory("rows")},
        new UniformGranularitySpec(
            Granularities.HOUR,
            Granularities.NONE,
            ImmutableList.of()
        ),
        null
    );
  }

  private static SeekableStreamSupervisorIOConfig getIOConfig()
  {
    return new SeekableStreamSupervisorIOConfig(
        "stream",
        new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false),
        1,
        1,
        new Period("PT1H"),
        new Period("P1D"),
        new Period("PT30S"),
        false,
        new Period("PT30M"),
        null,
        null,
        OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
        null,
        null,
        null
    )
    {
    };
  }

  private static Map<String, Object> getProperties()
  {
    HashMap<String, Object> autoScalerConfig = new HashMap<>();
    autoScalerConfig.put("enableTaskAutoScaler", true);
    autoScalerConfig.put("lagCollectionIntervalMillis", 500);
    autoScalerConfig.put("lagCollectionRangeMillis", 500);
    autoScalerConfig.put("scaleOutThreshold", 5000000);
    autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
    autoScalerConfig.put("scaleInThreshold", 1000000);
    autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
    autoScalerConfig.put("scaleActionStartDelayMillis", 0);
    autoScalerConfig.put("scaleActionPeriodMillis", 100);
    autoScalerConfig.put("taskCountMax", 8);
    autoScalerConfig.put("taskCountMin", 1);
    autoScalerConfig.put("scaleInStep", 1);
    autoScalerConfig.put("scaleOutStep", 2);
    autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
    return autoScalerConfig;
  }

  private static SeekableStreamSupervisorTuningConfig getTuningConfig()
  {
    return new SeekableStreamSupervisorTuningConfig()
    {
      @Override
      public Integer getWorkerThreads()
      {
        return 1;
      }

      @Override
      public Long getChatRetries()
      {
        return 1L;
      }

      @Override
      public Duration getHttpTimeout()
      {
        return new Period("PT1M").toStandardDuration();
      }

      @Override
      public Duration getShutdownTimeout()
      {
        return new Period("PT1S").toStandardDuration();
      }

      @Override
      public Duration getRepartitionTransitionDuration()
      {
        return new Period("PT2M").toStandardDuration();
      }

      @Override
      public Duration getOffsetFetchPeriod()
      {
        return new Period("PT5M").toStandardDuration();
      }

      @Override
      public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
      {
        return new SeekableStreamIndexTaskTuningConfig(
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null
        )
        {
          @Override
          public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
          {
            return null;
          }

          @Override
          public String toString()
          {
            return null;
          }
        };
      }
    };
  }

  private class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
  {
    public TestSeekableStreamIndexTask(
        String id,
        @Nullable TaskResource taskResource,
        DataSchema dataSchema,
        SeekableStreamIndexTaskTuningConfig tuningConfig,
        SeekableStreamIndexTaskIOConfig<String, String> ioConfig,
        @Nullable Map<String, Object> context,
        @Nullable String groupId
    )
    {
      super(
          id,
          taskResource,
          dataSchema,
          tuningConfig,
          ioConfig,
          context,
          groupId
      );
    }

    @Override
    protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
    {
      return null;
    }

    @Override
    protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
    {
      return recordSupplier;
    }

    @Override
    public String getType()
    {
      return "test";
    }
  }

  private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity>
  {
    private BaseTestSeekableStreamSupervisor()
    {
      super(
          "testSupervisorId",
          taskStorage,
          taskMaster,
          indexerMetadataStorageCoordinator,
          taskClientFactory,
          OBJECT_MAPPER,
          spec,
          rowIngestionMetersFactory,
          false
      );
    }

    @Override
    protected String baseTaskName()
    {
      return "test";
    }

    @Override
    protected void updatePartitionLagFromStream()
    {
      // do nothing
    }

    @Nullable
    @Override
    protected Map<String, Long> getPartitionRecordLag()
    {
      return null;
    }

    @Nullable
    @Override
    protected Map<String, Long> getPartitionTimeLag()
    {
      return null;
    }

    @Override
    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
        int groupId,
        Map<String, String> startPartitions,
        Map<String, String> endPartitions,
        String baseSequenceName,
        DateTime minimumMessageTime,
        DateTime maximumMessageTime,
        Set<String> exclusiveStartSequenceNumberPartitions,
        SeekableStreamSupervisorIOConfig ioConfig
    )
    {
      return new SeekableStreamIndexTaskIOConfig<String, String>(
          groupId,
          baseSequenceName,
          new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
          new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
          true,
          minimumMessageTime,
          maximumMessageTime,
          ioConfig.getInputFormat()
      )
      {
      };
    }

    @Override
    protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(
        int replicas,
        String baseSequenceName,
        ObjectMapper sortingMapper,
        TreeMap<Integer, Map<String, String>> sequenceOffsets,
        SeekableStreamIndexTaskIOConfig taskIoConfig,
        SeekableStreamIndexTaskTuningConfig taskTuningConfig,
        RowIngestionMetersFactory rowIngestionMetersFactory
    )
    {
      return ImmutableList.of(new TestSeekableStreamIndexTask(
          "id",
          null,
          getDataSchema(),
          taskTuningConfig,
          taskIoConfig,
          null,
          null
      ));
    }

    @Override
    protected int getTaskGroupIdForPartition(String partition)
    {
      try {
        return Integer.parseInt(partition) % spec.getIoConfig().getTaskCount();
      }
      catch (NumberFormatException e) {
        return 0;
      }
    }

    @Override
    protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
    {
      return true;
    }

    @Override
    protected boolean doesTaskTypeMatchSupervisor(Task task)
    {
      return true;
    }

    @Override
    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
        String stream,
        Map<String, String> map
    )
    {
      return null;
    }

    @Override
    protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive)
    {
      return new OrderedSequenceNumber<String>(seq, isExclusive)
      {
        @Override
        public int compareTo(OrderedSequenceNumber<String> o)
        {
          return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
        }
      };
    }

    @Override
    protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
    {
      return null;
    }

    @Override
    protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
    {
      return null;
    }

    @Override
    protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier()
    {
      return SeekableStreamSupervisorStateTest.this.recordSupplier;
    }

    @Override
    protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
        int numPartitions,
        boolean includeOffsets
    )
    {
      return new SeekableStreamSupervisorReportPayload<String, String>(
          DATASOURCE,
          STREAM,
          1,
          1,
          1L,
          null,
          null,
          null,
          null,
          null,
          null,
          false,
          true,
          null,
          null,
          null
      )
      {
      };
    }

    @Override
    protected String getNotSetMarker()
    {
      return "NOT_SET";
    }

    @Override
    protected String getEndOfPartitionMarker()
    {
      return "EOF";
    }

    @Override
    protected boolean isEndOfShard(String seqNum)
    {
      return false;
    }

    @Override
    protected boolean isShardExpirationMarker(String seqNum)
    {
      return false;
    }

    @Override
    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
    {
      return false;
    }
  }

  private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
  {
    @Override
    protected void scheduleReporting(ScheduledExecutorService reportingExec)
    {
      // do nothing
    }

    @Override
    public LagStats computeLagStats()
    {
      return new LagStats(0, 0, 0);
    }
  }

  private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
  {
    private final CountDownLatch latch;
    private final Map<String, Long> partitionsRecordLag;
    private final Map<String, Long> partitionsTimeLag;
    private final byte metricFlag;

    private static final byte LAG = 0x01;
    private static final byte NOTICE_QUEUE = 0x02;
    private static final byte NOTICE_PROCESS = 0x04;


    TestEmittingTestSeekableStreamSupervisor(
        CountDownLatch latch,
        byte metricFlag,
        Map<String, Long> partitionsRecordLag,
        Map<String, Long> partitionsTimeLag
    )
    {
      this.latch = latch;
      this.metricFlag = metricFlag;
      this.partitionsRecordLag = partitionsRecordLag;
      this.partitionsTimeLag = partitionsTimeLag;
    }

    @Nullable
    @Override
    protected Map<String, Long> getPartitionRecordLag()
    {
      return partitionsRecordLag;
    }

    @Nullable
    @Override
    protected Map<String, Long> getPartitionTimeLag()
    {
      return partitionsTimeLag;
    }

    @Override
    protected void emitLag()
    {
      if ((metricFlag & LAG) == 0) {
        return;
      }
      super.emitLag();
      if (stateManager.isSteadyState()) {
        latch.countDown();
      }
    }

    @Override
    protected void emitNoticesQueueSize()
    {
      if ((metricFlag & NOTICE_QUEUE) == 0) {
        return;
      }
      super.emitNoticesQueueSize();
      latch.countDown();
    }

    @Override
    public void emitNoticeProcessTime(String noticeType, long timeInMillis)
    {
      if ((metricFlag & NOTICE_PROCESS) == 0) {
        return;
      }
      super.emitNoticeProcessTime(noticeType, timeInMillis);
      latch.countDown();
    }

    @Override
    public LagStats computeLagStats()
    {
      return null;
    }

    @Override
    protected void scheduleReporting(ScheduledExecutorService reportingExec)
    {
      SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig();
      reportingExec.scheduleAtFixedRate(
          this::emitLag,
          ioConfig.getStartDelay().getMillis(),
          spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
          TimeUnit.MILLISECONDS
      );
      reportingExec.scheduleAtFixedRate(
          this::emitNoticesQueueSize,
          ioConfig.getStartDelay().getMillis(),
          spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
          TimeUnit.MILLISECONDS
      );
    }
  }

  private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
  {
    private final String taskType;
    private final TaskLocation location;
    private final String dataSource;

    TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location)
    {
      super(task.getId(), result);
      this.taskType = task.getType();
      this.location = location;
      this.dataSource = task.getDataSource();
    }

    @Override
    public TaskLocation getLocation()
    {
      return location;
    }

    @Override
    public String getTaskType()
    {
      return taskType;
    }

    @Override
    public String getDataSource()
    {
      return dataSource;
    }
  }

  private static class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata<String, String>
  {

    @JsonCreator
    public TestSeekableStreamDataSourceMetadata(
            @JsonProperty("partitions") SeekableStreamSequenceNumbers<String, String> partitions
    )
    {
      super(partitions);
    }

    @Override
    public DataSourceMetadata asStartMetadata()
    {
      final SeekableStreamSequenceNumbers<String, String> sequenceNumbers = getSeekableStreamSequenceNumbers();
      if (sequenceNumbers instanceof SeekableStreamEndSequenceNumbers) {
        return createConcreteDataSourceMetaData(
                ((SeekableStreamEndSequenceNumbers<String, String>) sequenceNumbers).asStartPartitions(true)
        );
      } else {
        return this;
      }
    }

    @Override
    protected SeekableStreamDataSourceMetadata<String, String> createConcreteDataSourceMetaData(
            SeekableStreamSequenceNumbers<String, String> seekableStreamSequenceNumbers
    )
    {
      return new TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers);
    }
  }

  private static SeekableStreamIndexTaskIOConfig createTaskIoConfigExt(
          int groupId,
          Map<String, String> startPartitions,
          Map<String, String> endPartitions,
          String baseSequenceName,
          DateTime minimumMessageTime,
          DateTime maximumMessageTime,
          Set<String> exclusiveStartSequenceNumberPartitions,
          SeekableStreamSupervisorIOConfig ioConfig
  )
  {
    return new SeekableStreamIndexTaskIOConfig<String, String>(
            groupId,
            baseSequenceName,
            new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
            new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
            true,
            minimumMessageTime,
            maximumMessageTime,
            ioConfig.getInputFormat()
    )
    {
    };
  }
}
